France flag

Big Data¶

(EN) Toronto Stock Exchange

(FR) Bourse de Toronto

Eder Valderrama eder.valderrama@usp.br 2023

(EN) Import packages to configure folders and Spark

Project Description:

  • Load a dataset of files with unified information.
  • Separate the full dataset into smaller datasets split by column value, using techniques to handle big data and reduce loading and execution time.
  • The dataset consists of listed companies on the Toronto Stock Exchange.
  • Split the companies and details by the sector of activities.
  • Evaluate the statistics of listed companies

USA flag

(FR) Importer les packages pour configurer les dossiers et Spark

Description du projet :

  • Charger un ensemble de fichiers de données avec des informations unifiées.
  • Séparer l'ensemble complet de données en ensembles plus petits, divisés selon la valeur des colonnes, en utilisant des techniques de traitement des big data pour réduire le temps de chargement et d'exécution.
  • L'ensemble de données comprend des entreprises cotées à la Bourse de Toronto.
  • Diviser les entreprises et les détails selon le secteur d'activité.
  • Évaluer les statistiques des entreprises cotées.

France flag

In [1]:
import os
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark import SparkConf
import pandas as pd
import plotly.express as px
import matplotlib.pyplot as plt

The parquet file¶


Le fichier Parquet¶

  • (EN) The Parquet file is optimized to handle big data. It offers better size compression than .csv or other formats when saving to disk. However, Parquet files are intensive users of RAM memory.

  • (FR) Le fichier Parquet est optimisé pour traiter de gros volumes de données. Il offre une meilleure compression de taille que les fichiers .csv ou d'autres formats lors de l'enregistrement sur le disque. Cependant, les fichiers Parquet sont gourmands en mémoire RAM.
In [2]:
dirname_parquet = Path('./Canada/')
folder_name = os.path.join(os.getcwd(), dirname_parquet)
folder_name
Out[2]:
'/app/Dev/Project/00_portfolio/00_stock_market_CA/Canada'
In [3]:
file_name_path_i = os.path.join(folder_name, \
                                "dataframe_companies_full.parquet")
file_name_path_i
Out[3]:
'/app/Dev/Project/00_portfolio/00_stock_market_CA/Canada/dataframe_companies_full.parquet'

Spark memory configuration¶


Configuration de la mémoire Spark¶

  • (EN) The performance of executing a PySpark script depends on the configuration of available memory. Setting the right configuration is important for performance and to guarantee that all steps of the analysis will be executed without memory errors.

  • (FR) Les performances d'exécution d'un script PySpark dépendent de la configuration de la mémoire disponible. Il est essentiel de définir la bonne configuration pour garantir des performances optimales et s'assurer que toutes les étapes de l'analyse seront exécutées sans erreurs de mémoire.
In [4]:
configuration_choice = ['balance', 'moderate']
configuration_choice = configuration_choice[1]
print(configuration_choice)
moderate
In [5]:
conf = SparkConf()
if configuration_choice == 'balance':
    # Heap size for the driver
    conf.set("spark.driver.memory", "1g")    
    # Heap size for the executors 
    conf.set("spark.executor.memory", "5g") 
    # No. of executors (equal to the # machine cores)
    conf.set("spark.executor.instances", "8")
    # No. of partitions (equal to the # machine cores)
    conf.set("spark.default.parallelism", "8")
elif configuration_choice == 'moderate':
    conf.set("spark.driver.memory", "2g") 
    conf.set("spark.executor.memory", "4g")
    conf.set("spark.executor.instances", "6")
    conf.set("spark.default.parallelism", "8")

Spark-python integration¶


Intégration Spark-Python¶

  • (EN) Start the server with Spark using memory configuration and an alias for the app. Check the status of the connection. Run locally without applying clustering procedures.

  • (FR) Démarrer le serveur avec Spark en utilisant une configuration de mémoire et un alias pour l'application. Vérifier l'état de la connexion. Exécuter localement sans appliquer de procédures de regroupement (clustering)
In [6]:
%%time
spark = SparkSession.builder.\
            config(conf=conf).\
            appName("ParquetSplit").\
            getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/25 16:28:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
CPU times: user 46.4 ms, sys: 41.7 ms, total: 88.1 ms
Wall time: 7.48 s
In [7]:
spark.getActiveSession()
Out[7]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.4.1
Master
local[*]
AppName
ParquetSplit

Load a Parquet file in Spark as a DataFrame¶


Charger un fichier Parquet dans Spark en tant que DataFrame¶

  • (EN) Create a Spark dataframe and evaluate the schema with name of variables, types and structure of columns

  • (FR) Créez un DataFrame Spark et examinez le schéma, y compris les noms des variables, les types et la structure des colonnes.
In [8]:
%%time
df = spark.read.parquet(file_name_path_i)
                                                                                
CPU times: user 67.1 ms, sys: 9.5 ms, total: 76.6 ms
Wall time: 1min 58s
In [9]:
%%time
df.printSchema()
root
 |-- url_company: string (nullable = true)
 |-- description: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- website: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- codes: string (nullable = true)

CPU times: user 4.95 ms, sys: 342 µs, total: 5.29 ms
Wall time: 34 ms

Path to save partitions¶


Chemin pour enregistrer les partitions¶

  • (EN) The Parquet file needs to be partitioned by sector. It is important to have only companies of the same sector in the same DataFrame to perform similarity analyses. The path of the new folder needs to be empty to create partitions

  • (FR) Le fichier Parquet doit être partitionné par secteur. Il est important de n'avoir que des entreprises du même secteur dans le même DataFrame pour effectuer des analyses de similarité. Le chemin du nouveau dossier doit être vide pour créer les partitions.
In [10]:
file_name_dir_partition = os.path.join(folder_name, \
                            'parquet_partitioned_by_sector')
file_name_dir_partition
Out[10]:
'/data/Dev/Project/00_portfolio/00_stock_market_CA/Canada/parquet_partitioned_by_sector'
In [11]:
os.path.exists(file_name_dir_partition)
Out[11]:
False
In [12]:
%%time
df.write.partitionBy("sector").parquet(file_name_dir_partition)
                                                                                
CPU times: user 157 ms, sys: 43.5 ms, total: 201 ms
Wall time: 6min 5s

Check the result of the split¶


Vérifiez le résultat de la division¶

  • (EN) The list of files inside the folder shows that the script has created new folders for each sector found in the Toronto Stock Exchange.

  • (FR) La liste des fichiers à l'intérieur du dossier montre que le script a créé de nouveaux dossiers pour chaque secteur trouvé à la Bourse de Toronto (Toronto Stock Exchange).
In [13]:
os.listdir(file_name_dir_partition)
Out[13]:
['._SUCCESS.crc',
 'sector=-',
 'sector=Consumer Discretionary',
 'sector=Consumer Staples',
 'sector=Energy',
 'sector=Finance',
 'sector=Healthcare',
 'sector=Industrials',
 'sector=Materials',
 'sector=Media',
 'sector=Real Estate',
 'sector=Sectors',
 'sector=Technology',
 'sector=Utilities',
 '_SUCCESS']

Statistics of listed companies¶


Statistiques des entreprises cotées¶

  • (EN) Agregate the data of companies per sector and industry. To integrate Spark and Pandas dataframes, apply the "groupby" function and generate data.

  • (FR) Agréger les données des entreprises par secteur et par industrie. Pour intégrer les DataFrames Spark et Pandas, appliquer la fonction 'groupby' et générer les données.
In [14]:
%%time
df_pandas = df.toPandas()
[Stage 1:========================================================>(80 + 1) / 81]
CPU times: user 233 ms, sys: 270 ms, total: 504 ms
Wall time: 1min 52s
                                                                                
In [15]:
%%time
df_grouped = df_pandas.groupby(['sector', 'industry'])['company_name'].count().reset_index()
CPU times: user 8.12 ms, sys: 5.58 ms, total: 13.7 ms
Wall time: 13 ms
In [16]:
df_grouped.head()
Out[16]:
sector industry company_name
0 - - 1263
1 Consumer Discretionary Apparel & Luxury 4
2 Consumer Discretionary Containers & Packaging 6
3 Consumer Discretionary Home and Homeware 3
4 Consumer Discretionary Hotels, Lodging & Leisure 9
  • (EN) The result of the sector and quantity of companies of the Toronto Stock Exchange.

  • (FR) Le résultat du secteur et du nombre d'entreprises de la Bourse de Toronto (Toronto Stock Exchange).
In [17]:
df_grouped_sector = df_pandas.groupby(['sector'])['company_name'].count().reset_index()
df_grouped_sector = df_grouped_sector.rename(columns={'company_name': 'qnt_companies'})
df_grouped_sector.sort_values(by='qnt_companies', in)
Out[17]:
sector qnt_companies
10 Sectors 2
2 Consumer Staples 47
8 Media 50
11 Technology 53
1 Consumer Discretionary 65
5 Healthcare 82
12 Utilities 90
9 Real Estate 93
6 Industrials 105
3 Energy 155
7 Materials 210
4 Finance 366
0 - 1263

Criate graphics to visual output¶


Créer des graphiques pour la sortie visuelle¶

  • (EN) Interactive graphs using Plotly package.

  • (FR) Graphiques interactifs à l'aide du package Plotly.
In [18]:
%%time
fig = px.bar(df_grouped, x='sector', y='company_name', color='industry',
             title='No. Companies per Sector and Industry',
             labels={'company_name': '# Companies'},
             height=500)
fig.show()
CPU times: user 466 ms, sys: 452 ms, total: 918 ms
Wall time: 944 ms
  • (EN) Static graph depicting the quantity of companies per sector, ordered using matplotlib.

  • (FR) Graphique statique représentant la quantité d'entreprises par secteur, triées à l'aide de matplotlib.
In [19]:
plt.figure(figsize=(10, 6))
bars = plt.bar(df_grouped_sector['sector'], df_grouped_sector['qnt_companies'], color='blue')
plt.xlabel('Sector')
plt.ylabel('# Companies')
plt.title('# Companies per Sector')
plt.xticks(rotation=45, ha='right')
for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width() / 2, height, str(int(height)),ha='center', va='bottom', fontsize=10, color='black')
plt.tight_layout()
plt.show()

Stop the Spark server¶


Arrêter le serveur Spark¶

  • (EN) It is important to stop the Spark server because it consumes resources while waiting for new tasks to run. In distributed applications like Databricks/AWS, the active machine's uptime incurs costs associated with the configuration, and it is essential to have control over computational resources.

  • (FR) Il est important d'arrêter le serveur Spark car il consomme des ressources en attendant de nouvelles tâches à exécuter. Dans les applications distribuées telles que Databricks / AWS, le temps d'utilisation de la machine entraîne des coûts associés à la configuration, et il est essentiel d'avoir le contrôle sur les ressources computationnelles.
In [20]:
spark.stop()

Contact¶

Eder Valderrama eder.valderrama@usp.br